package com.atguigu.flink.datastreamapi.combine;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/13
 */
public class Demo2_Connect
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        //读取两个流
        SingleOutputStreamOperator<MouseHead> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(x -> new MouseHead())
            .returns(TypeInformation.of(MouseHead.class));

        SingleOutputStreamOperator<DuckNeck> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(x -> new DuckNeck())
            .returns(TypeInformation.of(DuckNeck.class));

        //两个流汇合
        ConnectedStreams<MouseHead, DuckNeck> ds3 = ds1.connect(ds2);
        
        //对汇合后的流进行处理
        ds3
            .process(new CoProcessFunction<MouseHead, DuckNeck, DuckNeck>()
            {
                @Override
                public void processElement1(MouseHead value, Context ctx, Collector<DuckNeck> out) throws Exception {
                    System.out.println("对"+value+"添加鸭肉鲜....");
                    out.collect(new DuckNeck("熟，鲜美"));
                }

                @Override
                public void processElement2(DuckNeck value, Context ctx, Collector<DuckNeck> out) throws Exception {
                    System.out.println("对"+value+"添加防腐剂....");
                    out.collect(new DuckNeck("熟，鲜美"));
                }
            })
            .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

    @Data
    private static class MouseHead{
        private String name = "鼠头";
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DuckNeck{
        private String type = "生";
    }
}
